We're wasting a bunch of time waiting for our iterators to produce minibatches when we're running epochs. Seems like we should probably precompute them while the minibatch is being run on the GPU. To do this involves using the multiprocessing module. Since I've never used it before, here are my dev notes for writing this into the dataset iterators.
In [1]:
import multiprocessing
import numpy as np
In [2]:
p = multiprocessing.Pool(4)
In [3]:
x = range(3)
In [4]:
f = lambda x: x*2
In [5]:
def f(x):
return x**2
In [6]:
print(x)
For some reason can't run these in the notebook. So have to run them with subprocess like so:
In [9]:
%%python
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(5)
print(p.map(f, [1, 2, 3]))
In [10]:
%%python
from multiprocessing import Pool
import numpy as np
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(5)
print(p.map(f, np.array([1, 2, 3])))
Now doing this asynchronously:
In [29]:
%%python
from multiprocessing import Pool
import numpy as np
def f(x):
return x**2
if __name__ == '__main__':
p = Pool(5)
r = p.map_async(f, np.array([0,1,2]))
print(dir(r))
print(r.get(timeout=1))
Now trying to create an iterable that will precompute it's output using multiprocessing.
In [36]:
%%python
from multiprocessing import Pool
import numpy as np
def f(x):
return x**2
class It(object):
def __init__(self,a):
# store an array (2D)
self.a = a
# initialise pool
self.p = Pool(4)
# initialise index
self.i = 0
# initialise pre-computed first batch
self.batch = self.p.map_async(f,self.a[self.i,:])
def get(self):
return self.batch.get(timeout=1)
def f(self,x):
return x**2
if __name__ == '__main__':
it = It(np.random.randn(4,4))
print(it.get())
In [42]:
%%python
from multiprocessing import Pool
import numpy as np
def f(x):
return x**2
class It(object):
def __init__(self,a):
# store an array (2D)
self.a = a
# initialise pool
self.p = Pool(4)
# initialise index
self.i = 0
# initialise pre-computed first batch
self.batch = self.p.map_async(f,self.a[self.i,:])
def __iter__(self):
return self
def next(self):
# check if we've got something pre-computed to return
if self.batch:
# get the output
output = self.batch.get(timeout=1)
#output = self.batch
# prepare next batch
self.i += 1
if self.i < self.a.shape[0]:
self.p = Pool(4)
self.batch = self.p.map_async(f,self.a[self.i,:])
#self.batch = map(self.f,self.a[self.i,:])
else:
self.batch = False
return output
else:
raise StopIteration
if __name__ == '__main__':
it = It(np.random.randn(4,4))
for a in it:
print a
Then we have to try and do a similar thing, but using the randomaugment function. In the following two cells one uses multiprocessiung and one that doesn't. Testing them by pretending to ask for a minibatch and then sleep, applying the RandomAugment function each time.
In [96]:
%%time
%%python
from multiprocessing import Pool
import numpy as np
import neukrill_net.augment
import time
class It(object):
def __init__(self,a,f):
# store an array (2D)
self.a = a
# store the function
self.f = f
# initialise pool
self.p = Pool(4)
# initialise indices
self.inds = range(self.a.shape[0])
# pop a batch from top
self.batch_inds = [self.inds.pop(0) for _ in range(100)]
# initialise pre-computed first batch
self.batch = map(self.f,self.a[self.batch_inds,:])
def __iter__(self):
return self
def next(self):
# check if we've got something pre-computed to return
if self.inds != []:
# get the output
output = self.batch
# prepare next batch
self.batch_inds = [self.inds.pop(0) for _ in range(100)]
self.p = Pool(4)
self.batch = map(self.f,self.a[self.batch_inds,:])
return output
else:
raise StopIteration
if __name__ == '__main__':
f = neukrill_net.augment.RandomAugment(rotate=[0,90,180,270])
it = It(np.random.randn(10000,48,48),f)
for a in it:
time.sleep(0.01)
pass
In [97]:
%%time
%%python
from multiprocessing import Pool
import numpy as np
import neukrill_net.augment
import time
class It(object):
def __init__(self,a,f):
# store an array (2D)
self.a = a
# store the function
self.f = f
# initialise pool
self.p = Pool(8)
# initialise indices
self.inds = range(self.a.shape[0])
# pop a batch from top
self.batch_inds = [self.inds.pop(0) for _ in range(100)]
# initialise pre-computed first batch
self.batch = self.p.map_async(f,self.a[self.batch_inds,:])
def __iter__(self):
return self
def next(self):
# check if we've got something pre-computed to return
if self.inds != []:
# get the output
output = self.batch.get(timeout=1)
# prepare next batch
self.batch_inds = [self.inds.pop(0) for _ in range(100)]
#self.p = Pool(4)
self.batch = self.p.map_async(f,self.a[self.batch_inds,:])
return output
else:
raise StopIteration
if __name__ == '__main__':
f = neukrill_net.augment.RandomAugment(rotate=[0,90,180,270])
it = It(np.random.randn(10000,48,48),f)
for a in it:
time.sleep(0.01)
pass
In [103]:
%%time
%%python
from multiprocessing import Pool
import numpy as np
import neukrill_net.augment
import time
class It(object):
def __init__(self,a,f):
# store an array (2D)
self.a = a
# store the function
self.f = f
# initialise pool
self.p = Pool(8)
# initialise indices
self.inds = range(self.a.shape[0])
# pop a batch from top
self.batch_inds = [self.inds.pop(0) for _ in range(100)]
# initialise pre-computed first batch
self.batch = self.p.map_async(f,self.a[self.batch_inds,:])
def __iter__(self):
return self
def next(self):
# check if we've got something pre-computed to return
if self.inds != []:
# get the output
output = self.batch.get(timeout=1)
# prepare next batch
self.batch_inds = [self.inds.pop(0) for _ in range(100)]
#self.p = Pool(4)
self.batch = self.p.map_async(f,self.a[self.batch_inds,:])
return output
else:
raise StopIteration
if __name__ == '__main__':
f = neukrill_net.augment.RandomAugment(rotate=[0,90,180,270])
it = It(np.random.randn(10000,48,48),f)
for a in it:
print np.array(a).shape
print np.array(a).reshape(100,48,48,1).shape
break
It looks like, depending on the sleep time this should be about 5 times as fast.